From: Brad Jorsch Date: Sun, 18 Mar 2018 03:13:43 +0000 (-0400) Subject: rdbms: Allow PostgreSQL schema-check functions to find temporary tables X-Git-Tag: 1.31.0-rc.0~183^2 X-Git-Url: http://git.cyclocoop.org/%7D%7Cconcat%7B?a=commitdiff_plain;h=43d1f706be54b916923a93512892440e014fcac0;p=lhc%2Fweb%2Fwiklou.git rdbms: Allow PostgreSQL schema-check functions to find temporary tables PostgreSQL puts temporary tables and such in a hidden, per-connection "schema" that's checked for unqualified table accesses before the normal search_path. We should check that in all the schema-checking functions. Change-Id: I1194ac31f31133b177f624138afee19d00e454b9 --- diff --git a/includes/libs/rdbms/database/DatabasePostgres.php b/includes/libs/rdbms/database/DatabasePostgres.php index 9c2478769a..32ea37592f 100644 --- a/includes/libs/rdbms/database/DatabasePostgres.php +++ b/includes/libs/rdbms/database/DatabasePostgres.php @@ -43,6 +43,8 @@ class DatabasePostgres extends Database { private $connectString; /** @var string */ private $coreSchema; + /** @var string */ + private $tempSchema; /** @var string[] Map of (reserved table name => alternate table name) */ private $keywordTableMap = []; @@ -73,15 +75,17 @@ class DatabasePostgres extends Database { } public function hasConstraint( $name ) { - $conn = $this->getBindingHandle(); - - $sql = "SELECT 1 FROM pg_catalog.pg_constraint c, pg_catalog.pg_namespace n " . - "WHERE c.connamespace = n.oid AND conname = '" . - pg_escape_string( $conn, $name ) . "' AND n.nspname = '" . - pg_escape_string( $conn, $this->getCoreSchema() ) . "'"; - $res = $this->doQuery( $sql ); - - return $this->numRows( $res ); + foreach ( $this->getCoreSchemas() as $schema ) { + $sql = "SELECT 1 FROM pg_catalog.pg_constraint c, pg_catalog.pg_namespace n " . + "WHERE c.connamespace = n.oid AND conname = " . + $this->addQuotes( $name ) . " AND n.nspname = " . + $this->addQuotes( $schema ); + $res = $this->doQuery( $sql ); + if ( $res && $this->numRows( $res ) ) { + return true; + } + } + return false; } public function open( $server, $user, $password, $dbName ) { @@ -428,59 +432,65 @@ class DatabasePostgres extends Database { public function indexAttributes( $index, $schema = false ) { if ( $schema === false ) { - $schema = $this->getCoreSchema(); - } - /* - * A subquery would be not needed if we didn't care about the order - * of attributes, but we do - */ - $sql = <<<__INDEXATTR__ - - SELECT opcname, - attname, - i.indoption[s.g] as option, - pg_am.amname - FROM - (SELECT generate_series(array_lower(isub.indkey,1), array_upper(isub.indkey,1)) AS g - FROM - pg_index isub - JOIN pg_class cis - ON cis.oid=isub.indexrelid - JOIN pg_namespace ns - ON cis.relnamespace = ns.oid - WHERE cis.relname='$index' AND ns.nspname='$schema') AS s, - pg_attribute, - pg_opclass opcls, - pg_am, - pg_class ci - JOIN pg_index i - ON ci.oid=i.indexrelid - JOIN pg_class ct - ON ct.oid = i.indrelid - JOIN pg_namespace n - ON ci.relnamespace = n.oid - WHERE - ci.relname='$index' AND n.nspname='$schema' - AND attrelid = ct.oid - AND i.indkey[s.g] = attnum - AND i.indclass[s.g] = opcls.oid - AND pg_am.oid = opcls.opcmethod + $schemas = $this->getCoreSchemas(); + } else { + $schemas = [ $schema ]; + } + + $eindex = $this->addQuotes( $index ); + + foreach ( $schemas as $schema ) { + $eschema = $this->addQuotes( $schema ); + /* + * A subquery would be not needed if we didn't care about the order + * of attributes, but we do + */ + $sql = <<<__INDEXATTR__ + + SELECT opcname, + attname, + i.indoption[s.g] as option, + pg_am.amname + FROM + (SELECT generate_series(array_lower(isub.indkey,1), array_upper(isub.indkey,1)) AS g + FROM + pg_index isub + JOIN pg_class cis + ON cis.oid=isub.indexrelid + JOIN pg_namespace ns + ON cis.relnamespace = ns.oid + WHERE cis.relname=$eindex AND ns.nspname=$eschema) AS s, + pg_attribute, + pg_opclass opcls, + pg_am, + pg_class ci + JOIN pg_index i + ON ci.oid=i.indexrelid + JOIN pg_class ct + ON ct.oid = i.indrelid + JOIN pg_namespace n + ON ci.relnamespace = n.oid + WHERE + ci.relname=$eindex AND n.nspname=$eschema + AND attrelid = ct.oid + AND i.indkey[s.g] = attnum + AND i.indclass[s.g] = opcls.oid + AND pg_am.oid = opcls.opcmethod __INDEXATTR__; - $res = $this->query( $sql, __METHOD__ ); - $a = []; - if ( $res ) { - foreach ( $res as $row ) { - $a[] = [ - $row->attname, - $row->opcname, - $row->amname, - $row->option ]; + $res = $this->query( $sql, __METHOD__ ); + $a = []; + if ( $res ) { + foreach ( $res as $row ) { + $a[] = [ + $row->attname, + $row->opcname, + $row->amname, + $row->option ]; + } + return $a; } - } else { - return null; } - - return $a; + return null; } public function indexUnique( $table, $index, $fname = __METHOD__ ) { @@ -784,9 +794,9 @@ __INDEXATTR__; } public function listTables( $prefix = null, $fname = __METHOD__ ) { - $eschema = $this->addQuotes( $this->getCoreSchema() ); + $eschemas = implode( ',', array_map( [ $this, 'addQuotes' ], $this->getCoreSchemas() ) ); $result = $this->query( - "SELECT tablename FROM pg_tables WHERE schemaname = $eschema", $fname ); + "SELECT DISTINCT tablename FROM pg_tables WHERE schemaname IN ($eschemas)", $fname ); $endArray = []; foreach ( $result as $table ) { @@ -977,6 +987,29 @@ __INDEXATTR__; return $this->coreSchema; } + /** + * Return schema names for temporary tables and core application tables + * + * @since 1.31 + * @return string[] schema names + */ + public function getCoreSchemas() { + if ( $this->tempSchema ) { + return [ $this->tempSchema, $this->getCoreSchema() ]; + } + + $res = $this->query( + "SELECT nspname FROM pg_catalog.pg_namespace n WHERE n.oid = pg_my_temp_schema()", __METHOD__ + ); + $row = $this->fetchObject( $res ); + if ( $row ) { + $this->tempSchema = $row->nspname; + return [ $this->tempSchema, $this->getCoreSchema() ]; + } + + return [ $this->getCoreSchema() ]; + } + public function getServerVersion() { if ( !isset( $this->numericVersion ) ) { $conn = $this->getBindingHandle(); @@ -1009,18 +1042,24 @@ __INDEXATTR__; $types = [ $types ]; } if ( $schema === false ) { - $schema = $this->getCoreSchema(); + $schemas = $this->getCoreSchemas(); + } else { + $schemas = [ $schema ]; } $table = $this->realTableName( $table, 'raw' ); $etable = $this->addQuotes( $table ); - $eschema = $this->addQuotes( $schema ); - $sql = "SELECT 1 FROM pg_catalog.pg_class c, pg_catalog.pg_namespace n " - . "WHERE c.relnamespace = n.oid AND c.relname = $etable AND n.nspname = $eschema " - . "AND c.relkind IN ('" . implode( "','", $types ) . "')"; - $res = $this->query( $sql ); - $count = $res ? $res->numRows() : 0; + foreach ( $schemas as $schema ) { + $eschema = $this->addQuotes( $schema ); + $sql = "SELECT 1 FROM pg_catalog.pg_class c, pg_catalog.pg_namespace n " + . "WHERE c.relnamespace = n.oid AND c.relname = $etable AND n.nspname = $eschema " + . "AND c.relkind IN ('" . implode( "','", $types ) . "')"; + $res = $this->query( $sql ); + if ( $res && $res->numRows() ) { + return true; + } + } - return (bool)$count; + return false; } /** @@ -1045,20 +1084,21 @@ __INDEXATTR__; AND tgrelid=pg_class.oid AND nspname=%s AND relname=%s AND tgname=%s SQL; - $res = $this->query( - sprintf( - $q, - $this->addQuotes( $this->getCoreSchema() ), - $this->addQuotes( $table ), - $this->addQuotes( $trigger ) - ) - ); - if ( !$res ) { - return null; + foreach ( $this->getCoreSchemas() as $schema ) { + $res = $this->query( + sprintf( + $q, + $this->addQuotes( $schema ), + $this->addQuotes( $table ), + $this->addQuotes( $trigger ) + ) + ); + if ( $res && $res->numRows() ) { + return true; + } } - $rows = $res->numRows(); - return $rows; + return false; } public function ruleExists( $table, $rule ) { @@ -1066,7 +1106,7 @@ SQL; [ 'rulename' => $rule, 'tablename' => $table, - 'schemaname' => $this->getCoreSchema() + 'schemaname' => $this->getCoreSchemas() ] ); @@ -1074,19 +1114,19 @@ SQL; } public function constraintExists( $table, $constraint ) { - $sql = sprintf( "SELECT 1 FROM information_schema.table_constraints " . - "WHERE constraint_schema = %s AND table_name = %s AND constraint_name = %s", - $this->addQuotes( $this->getCoreSchema() ), - $this->addQuotes( $table ), - $this->addQuotes( $constraint ) - ); - $res = $this->query( $sql ); - if ( !$res ) { - return null; + foreach ( $this->getCoreSchemas() as $schema ) { + $sql = sprintf( "SELECT 1 FROM information_schema.table_constraints " . + "WHERE constraint_schema = %s AND table_name = %s AND constraint_name = %s", + $this->addQuotes( $schema ), + $this->addQuotes( $table ), + $this->addQuotes( $constraint ) + ); + $res = $this->query( $sql ); + if ( $res && $res->numRows() ) { + return true; + } } - $rows = $res->numRows(); - - return $rows; + return false; } /** diff --git a/includes/libs/rdbms/field/PostgresField.php b/includes/libs/rdbms/field/PostgresField.php index 600f34a456..53c3d335b1 100644 --- a/includes/libs/rdbms/field/PostgresField.php +++ b/includes/libs/rdbms/field/PostgresField.php @@ -38,30 +38,34 @@ AND attname=%s; SQL; $table = $db->remappedTableName( $table ); - $res = $db->query( - sprintf( $q, - $db->addQuotes( $db->getCoreSchema() ), - $db->addQuotes( $table ), - $db->addQuotes( $field ) - ) - ); - $row = $db->fetchObject( $res ); - if ( !$row ) { - return null; + foreach ( $db->getCoreSchemas() as $schema ) { + $res = $db->query( + sprintf( $q, + $db->addQuotes( $schema ), + $db->addQuotes( $table ), + $db->addQuotes( $field ) + ) + ); + $row = $db->fetchObject( $res ); + if ( !$row ) { + continue; + } + $n = new PostgresField; + $n->type = $row->typname; + $n->nullable = ( $row->attnotnull == 'f' ); + $n->name = $field; + $n->tablename = $table; + $n->max_length = $row->attlen; + $n->deferrable = ( $row->deferrable == 't' ); + $n->deferred = ( $row->deferred == 't' ); + $n->conname = $row->conname; + $n->has_default = ( $row->atthasdef === 't' ); + $n->default = $row->adsrc; + + return $n; } - $n = new PostgresField; - $n->type = $row->typname; - $n->nullable = ( $row->attnotnull == 'f' ); - $n->name = $field; - $n->tablename = $table; - $n->max_length = $row->attlen; - $n->deferrable = ( $row->deferrable == 't' ); - $n->deferred = ( $row->deferred == 't' ); - $n->conname = $row->conname; - $n->has_default = ( $row->atthasdef === 't' ); - $n->default = $row->adsrc; - return $n; + return null; } function name() {